package com.skyline.courier.mq;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 队列化的消息发生客户端实现
 * 
 * @author wuqh
 * 
 */
public abstract class QueuedMessageSendClient extends AbstractMessageSendClient implements MessageSendClient {
	private static final Logger LOGGER = LoggerFactory.getLogger(QueuedMessageSendClient.class);

	private final BlockingQueue<MessageWrapper> queue = new LinkedBlockingQueue<MessageWrapper>();

	private long pollTimeout = 1000L; // 默认1秒
	private int workThreads = (Runtime.getRuntime().availableProcessors() * 2);
	private ExecutorService workerExecutors;

	private volatile boolean running = false;

	@Override
	public <T> void send(String key, T message, Protocol<T, byte[]> protocol, boolean mandatory, boolean direct) {
		if (LOGGER.isTraceEnabled()) {
			LOGGER.trace("发送对象[" + message + "]到 [" + key + "] - mandatory [" + mandatory + "] - immediate [" + direct
					+ "]");
		}
		byte[] data = protocol.encode(message);
		MessageWrapper messageWrapper = new MessageWrapper(data, key, mandatory, direct);
		queue.add(messageWrapper);
	}

	private void sendMessage(MessageWrapper message) {
		sendInternal(message.getKey(), message.getData(), message.isMandatory(), message.isDirect());
	}

	public void startWorker() {
		resetWorkers();
		workerExecutors.execute(new Worker());
		running = true;
	}
	
	private void resetWorkers() {
		if(workerExecutors != null) {
			workerExecutors.shutdown();
		}
		workerExecutors = Executors.newFixedThreadPool(workThreads, new NamedThreadFactory("MQClientWorker"));
	}

	/**
	 * 销毁消息发送客户端
	 * 
	 */
	public synchronized void destroy() {
		running = false;
		if(workerExecutors != null) {
			workerExecutors.shutdown();
		}
		queue.clear();
	}

	public void setPollTimeout(long pollTimeout) {
		this.pollTimeout = pollTimeout;
	}
	
	public void setWorkThreads(int workThreads) {
		this.workThreads = workThreads;
	}
	
	@Override
	protected void finalize() throws Throwable {
		destroy();
		super.finalize();
	}

	private final class Worker implements Runnable {

		@Override
		public void run() {
			while (running) {
				try {
					MessageWrapper message = queue.poll(pollTimeout, TimeUnit.MILLISECONDS);
					if (message != null) {
						sendMessage(message);
					}
				} catch (InterruptedException ie) {
					if (LOGGER.isDebugEnabled()) {
						LOGGER.debug("等待发送消息发生异常");
					}
				} catch (Exception e) {
					LOGGER.error("发送消息发生异常", e);
				}
			}
		}

	}

	/**
	 * 发送消息信息封装类
	 * 
	 * @author wuqh
	 * 
	 */
	private final class MessageWrapper {
		private final byte[] data;
		private final String key;
		private final boolean mandatory;
		private final boolean direct;

		private MessageWrapper(byte[] data, String key, boolean mandatory, boolean direct) {
			this.data = data;
			this.key = key;
			this.mandatory = mandatory;
			this.direct = direct;
		}

		public byte[] getData() {
			return data;
		}

		public String getKey() {
			return key;
		}

		public boolean isMandatory() {
			return mandatory;
		}

		public boolean isDirect() {
			return direct;
		}
	}
}
